/*******************************************************************************
 * Package: com.song.bigdata.stream
 * Type:    CustomSourceFunction
 * Date:    2022-10-27 22:23
 *
 * Copyright (c) 2022 HUANENG GUICHENG TRUST CORP.,LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.bigdata.stream;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 功能描述： 自定义数据源
 *
 * @author Songxianyang
 * @date 2022-10-27 22:23
 */
public class CustomSourceFunction {
    public static void main(String[] args) throws Exception {
        // 创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
        //        3, // 尝试重启的次数
        //        Time.of(10, TimeUnit.SECONDS) // 间隔
        //));
        // 并行度
        //environment.setParallelism(1);
        DataStreamSource<Integer> integerDataStreamSource = environment.addSource(new IntSourceFunction()).setParallelism(3);
        integerDataStreamSource.print();
        environment.execute();
    }
}

/**
 * 可以设置并行度的SourceFunction
 */
class IntSourceFunction implements ParallelSourceFunction<Integer> {
    private Boolean aBoolean = true;
    @Override
    public void run(SourceContext sourceContext) throws Exception {
        Random random = new Random();
        while (aBoolean) {
            sourceContext.collect(random.nextInt());
        }
    }

    @Override
    public void cancel() {
        aBoolean = false;
    }
}
